S3バケットに出力したDynamoDBテーブルのデータ変更履歴をAthenaでクエリしてみた(AWS CDK)
こんにちは、CX事業本部 IoT事業部の若槻です。
前回の下記エントリでは、DynamoDBテーブルのデータ変更履歴をS3バケットに出力する仕組みをAWS CDKで構成しました。
今回は、S3バケットに出力されたデータを、さらにAmazon Athenaでクエリする構成をAWS CDKで実装してみました。
やってみた
下記の赤枠の部分をAWS CDKで追加で作成します。
CDKコード
前回のエントリのコードに、ハイライトした部分を追記しています。
import * as cdk from '@aws-cdk/core'; import * as dynamodb from '@aws-cdk/aws-dynamodb'; import * as s3 from '@aws-cdk/aws-s3'; import * as kinesis from '@aws-cdk/aws-kinesis'; import * as kinesisfirehose from '@aws-cdk/aws-kinesisfirehose'; import * as kinesisDestinations from '@aws-cdk/aws-kinesisfirehose-destinations'; import * as glue from '@aws-cdk/aws-glue'; import * as athena from '@aws-cdk/aws-athena'; export class AwsCdkAppStack extends cdk.Stack { constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); const deviceTableDataChengeLogsBucket = new s3.Bucket( this, 'deviceTableDataChengeLogs' ); const deviceTableDataChangeDataStream = new kinesis.Stream( this, 'deviceTableDataChangeStream', { shardCount: 1, } ); new kinesisfirehose.DeliveryStream( this, 'deviceTableDataChangeDeliveryStream', { sourceStream: deviceTableDataChangeDataStream, destinations: [ new kinesisDestinations.S3Bucket(deviceTableDataChengeLogsBucket, { dataOutputPrefix: 'data/!{timestamp:yyyy/MM/dd/HH/}', errorOutputPrefix: 'error/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd/HH/}', }), ], } ); new dynamodb.Table(this, 'deviceTable', { tableName: 'deviceTable', partitionKey: { name: 'deviceId', type: dynamodb.AttributeType.STRING }, billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, kinesisStream: deviceTableDataChangeDataStream, }); //Glueデータベース const glueDataBase = new glue.Database(this, 'glueDataBase', { databaseName: 'gluedatabase', }); //Glueテーブル new glue.Table(this, 'glueTable', { bucket: deviceTableDataChengeLogsBucket, s3Prefix: 'data/', database: glueDataBase, tableName: 'gluetable', columns: [ { name: 'eventName', type: glue.Schema.STRING, }, { name: 'dynamodb', type: glue.Schema.struct([ { name: 'ApproximateCreationDateTime', type: glue.Schema.BIG_INT, }, { name: 'Keys', type: glue.Schema.struct([ { name: 'deviceId', type: glue.Schema.struct([ { name: 'S', type: glue.Schema.STRING, }, ]), }, ]), }, { name: 'NewImage', type: glue.Schema.struct([ { name: 'deviceId', type: glue.Schema.struct([ { name: 'S', type: glue.Schema.STRING, }, ]), }, { name: 'deviceName', type: glue.Schema.struct([ { name: 'S', type: glue.Schema.STRING, }, ]), }, ]), }, { name: 'OldImage', type: glue.Schema.struct([ { name: 'deviceId', type: glue.Schema.struct([ { name: 'S', type: glue.Schema.STRING, }, ]), }, { name: 'deviceName', type: glue.Schema.struct([ { name: 'S', type: glue.Schema.STRING, }, ]), }, ]), }, ]), }, ], dataFormat: glue.DataFormat.JSON, }); //Athenaクエリ結果出力先バケット const athenaQueryResultBucket = new s3.Bucket( this, 'athenaQueryResultBucket' ); //Athenaワークグループ new athena.CfnWorkGroup(this, 'athenaWorkGroup', { name: 'athenaWorkGroup', workGroupConfiguration: { resultConfiguration: { outputLocation: `s3://${athenaQueryResultBucket.bucketName}/data`, }, }, }); } }
DynamoSBのデータ変更履歴データ例は下記のようになります。このデータのスキーマをglue.Table()
のcolumns
で定義しています。今回はeventName
とdynamodb
の2スキーマのみとしています。
{ "awsRegion": "ap-northeast-1", "eventID": "a77152e9-4a99-40b6-bd44-ecf4760123e3", "eventName": "INSERT", "userIdentity": null, "recordFormat": "application/json", "tableName": "deviceTable", "dynamodb": { "ApproximateCreationDateTime": 1637419833903, "Keys": { "deviceId": { "S": "d001" } }, "NewImage": { "deviceId": { "S": "d001" }, { "deviceName": { "S": "デバイス001" }, }, "SizeBytes": 24 }, "eventSource": "aws:dynamodb" }
動作
Amazon AthenaのマネジメントコンソールでSELECTクエリを実行してみます。
SELECTを*
で実行してみます。
SELECT * FROM "gluedatabase"."gluetable"
すると入れ子構造のJSONはdynamodb
列のように入れ子構造のまま出力され、扱いにくい形式となってしまいます。
よって入れ子構造のJSON内の値は下記のように.
でアクセスしてas
でカスタム列としフラット化すれば扱いやすくなります。
SELECT eventname, dynamodb.approximatecreationdatetime as approximatecreationdatetime, dynamodb.newimage.deviceid.s as deviceid, dynamodb.newimage.devicename.s as devicename FROM "gluedatabase"."gluetable"
(クエリ実行時のキャプチャは取り忘れましたが、)DATE_FORMAT
やFROM_UNIXTIME
を使えばapproximatecreationdatetimeの形式を変更することも可能です。
SELECT eventname, DATE_FORMAT(FROM_UNIXTIME(dynamodb.approximatecreationdatetime/1000, 'Asia/Tokyo'),'%Y-%m-%d %H:%i:%s') as timestamp, dynamodb.newimage.deviceid.s as deviceid, dynamodb.newimage.devicename.s as devicename FROM "gluedatabase"."gluetable"
注意点
AthenaではUnixTimeミリ秒のデータの方はBIGINTとする必要がある
CDKでのGlueテーブルのcolumnの定義で、ApproximateCreationDateTime
の値はUnixTimeミリ秒となるためBIGINT型として定義する必要があります。
{ name: 'ApproximateCreationDateTime', type: glue.Schema.BIG_INT, },
INT型として定義した場合は下記のHIVE_BAD_DATA
エラーとなります。
これはUnixTimeミリ秒はAthenaのクエリエンジンであるPrestoのINT値の範囲を超えてしまうためです。
Presto の INT 値の範囲が -2147483648~2147483647 であるため、Athena は「49612833315」を解析できませんでした。
参考
- AWS Glueデータカタログへのテーブルの作成とAthenaの設定をCloudFormationでやってみた | DevelopersIO
- 【新機能】Amazon DynamoDB Table を S3 に Export して Amazon Athena でクエリを実行する | DevelopersIO
- JSONSerDe によるマッピングを使って、入れ子の JSON から Amazon Athena のテーブルを作成する | Amazon Web Services ブログ
- prestoの日付関数 - Qiita
- AWS Athenaでのタイムスタンプの扱い方 まとめ - Qiita
- AWS WAFのログをAthenaで整形する - サーバーワークスエンジニアブログ
以上